/*
 * Created on Tue Mar 26 2024
 *
 * This file is a part of Skytable
 * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
 * NoSQL database written by Sayan Nandan ("the Author") with the
 * vision to provide flexibility in data modelling without compromising
 * on performance, queryability or scalability.
 *
 * Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 *
*/

use {
    super::{SimpleDB, SimpleDBJournal},
    crate::{
        engine::{
            error::ErrorKind,
            fractal,
            storage::{
                common::{
                    interface::{
                        fs::{File, FileExt, FileSystem, FileWrite, FileWriteExt},
                        vfs_utils,
                    },
                    sdss::sdss_r1::FileSpecV1,
                },
                v2::raw::journal::{
                    create_journal, open_journal,
                    raw::{
                        debug_get_offsets, debug_get_trace, debug_set_offset_tracking, DriverEvent,
                        DriverEventKind, JournalReaderTraceEvent, JournalWriterTraceEvent,
                        RawJournalWriter,
                    },
                    repair_journal, JournalRepairMode, JournalSettings, RawJournalAdapter,
                    RepairResult,
                },
            },
            RuntimeResult,
        },
        IoResult,
    },
    std::{collections::BTreeMap, io::ErrorKind as IoErrorKind, ops::Range},
};

/// the number of events that we want to usually emulate for
const TRIALS: usize = 100;
/// See impl of [`apply_event_mix`]. We remove every 10th element
const POST_TRIALS_SIZE: usize = TRIALS - (TRIALS / 10);
/// a test key for single events
const KEY: &str = concat!("1234567890-0987654321");
const SIMPLEDB_JOURNAL_HEADER_SIZE: usize =
    <<SimpleDBJournal as RawJournalAdapter>::Spec as FileSpecV1>::SIZE;

/// The initializer for a corruption test case
struct Initializer {
    journal_id: &'static str,
    initializer_fn: fn(&str) -> RuntimeResult<InitializerInfo>,
    last_event_size: usize,
}

#[derive(Debug)]
/// Information about the modified journal generated by an [`Initializer`]
struct ModifiedJournalInfo {
    init: InitializerInfo,
    storage: ModifiedJournalStorageInfo,
    initializer_id: usize,
}

impl ModifiedJournalInfo {
    fn new(
        init: InitializerInfo,
        storage: ModifiedJournalStorageInfo,
        initializer_id: usize,
    ) -> Self {
        Self {
            init,
            storage,
            initializer_id,
        }
    }
}

#[derive(Debug, Clone, Copy)]
/// Information about the initial state of a "good journal". Generated from [`Initializer`]
struct InitializerInfo {
    corrupted_event_id: u64,
    last_executed_event_id: u64,
}

impl InitializerInfo {
    /// The initializer only creates one event
    fn new_last_event(last_event_id: u64) -> Self {
        Self::new(last_event_id, last_event_id)
    }
    fn new(corrupted_event_id: u64, last_executed_event_id: u64) -> Self {
        Self {
            corrupted_event_id,
            last_executed_event_id,
        }
    }
    /// Returns true if the initializer created multiple events (and not a single event)
    fn not_last_event(&self) -> bool {
        self.corrupted_event_id != self.last_executed_event_id
    }
}

impl Initializer {
    fn new(
        name: &'static str,
        f: fn(&str) -> RuntimeResult<InitializerInfo>,
        last_event_size: usize,
    ) -> Self {
        Self {
            journal_id: name,
            initializer_fn: f,
            last_event_size,
        }
    }
    fn new_driver_type(name: &'static str, f: fn(&str) -> RuntimeResult<InitializerInfo>) -> Self {
        Self::new(name, f, DriverEvent::FULL_EVENT_SIZE)
    }
}

fn make_corrupted_file_name(journal_id: &str, trim_size: usize) -> String {
    format!("{journal_id}-trimmed-{trim_size}.db")
}

fn journal_init(journal_id: &str) -> RuntimeResult<RawJournalWriter<SimpleDBJournal>> {
    create_journal(journal_id)
}

fn journal_open(
    journal_id: &str,
    db: &SimpleDB,
) -> RuntimeResult<RawJournalWriter<SimpleDBJournal>> {
    open_journal(journal_id, db, JournalSettings::default()).map(|(jw, _)| jw)
}

#[derive(Debug)]
/// Information about the layout of the modified journal
struct ModifiedJournalStorageInfo {
    original_file_size: usize,
    modified_file_size: usize,
    corruption_range: Range<usize>,
}

impl ModifiedJournalStorageInfo {
    fn new(
        original_file_size: usize,
        modified_file_size: usize,
        corruption_range: Range<usize>,
    ) -> Self {
        Self {
            original_file_size,
            modified_file_size,
            corruption_range,
        }
    }
}

/**
 Emulate a sequentially varying corruption.
 - The initializer creates a modified journal and provides information about it
 - We go over each initializer and then enumerate a bunch of corruption test cases.
 - Generally, we take the size of the event, n, (it isn't necessary that it's a static size but
 should atleast be computable/traced somehow) and then shave off 1 bit, followed by upto n bytes
*/
fn emulate_sequentially_varying_single_corruption(
    initializers: impl IntoIterator<Item = Initializer>,
    modified_journal_generator_fn: impl Fn(
        &str,
        &str,
        &InitializerInfo,
        usize,
        &BTreeMap<u64, u64>,
    ) -> IoResult<ModifiedJournalStorageInfo>,
    post_corruption_handler: impl Fn(
        &str,
        &ModifiedJournalInfo,
        usize,
        SimpleDB,
        RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
    ),
    post_repair_handler: impl Fn(
        &str,
        &ModifiedJournalInfo,
        usize,
        RuntimeResult<RepairResult>,
        SimpleDB,
        RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
    ),
) {
    for (
        initializer_id,
        Initializer {
            journal_id,
            initializer_fn,
            last_event_size,
        },
    ) in initializers.into_iter().enumerate()
    {
        // initialize journal, get size and clear traces
        let initializer_info = match initializer_fn(journal_id) {
            Ok(nid) => nid,
            Err(e) => panic!(
                "failed to initialize {journal_id} due to {e}. trace: {:?}, file_data={:?}",
                debug_get_trace(),
                FileSystem::read(journal_id),
            ),
        };
        let _ = debug_get_trace();
        let original_offsets = debug_get_offsets();
        // now trim and repeat
        for trim_size in 1..=last_event_size {
            // create a copy of the "good" journal and corrupt it
            let corrupted_journal_path = make_corrupted_file_name(journal_id, trim_size);
            let open_journal_fn = |db: &SimpleDB| journal_open(&corrupted_journal_path, db);
            // modify journal
            let storage_info = modified_journal_generator_fn(
                journal_id,
                &corrupted_journal_path,
                &initializer_info,
                trim_size,
                &original_offsets,
            )
            .unwrap();
            assert_ne!(storage_info.corruption_range.len(), 0);
            assert_ne!(
                storage_info.modified_file_size,
                storage_info.original_file_size
            );
            let modified_journal_info =
                ModifiedJournalInfo::new(initializer_info, storage_info, initializer_id);
            // now let the caller handle any post corruption work
            {
                let sdb = SimpleDB::new();
                let open_journal_result = open_journal_fn(&sdb);
                post_corruption_handler(
                    journal_id,
                    &modified_journal_info,
                    trim_size,
                    sdb,
                    open_journal_result,
                );
            }
            // repair and let the caller handle post repair work
            let repair_result;
            {
                let sdb = SimpleDB::new();
                repair_result = repair_journal::<SimpleDBJournal>(
                    &corrupted_journal_path,
                    &sdb,
                    JournalSettings::default(),
                    JournalRepairMode::Simple,
                );
            }
            {
                let sdb = SimpleDB::new();
                let repaired_journal_reopen_result = open_journal_fn(&sdb);
                // let caller handle any post repair work
                post_repair_handler(
                    journal_id,
                    &modified_journal_info,
                    trim_size,
                    repair_result,
                    sdb,
                    repaired_journal_reopen_result,
                );
            }
            // we're done, delete the corrupted journal
            FileSystem::remove_file(&corrupted_journal_path).unwrap();
        }
        // delete the good journal, we're done with this one as well
        FileSystem::remove_file(journal_id).unwrap();
    }
}

/// In this emulation, we sequentially corrupt the last event across multiple trials
fn emulate_final_event_corruption(
    initializers: impl IntoIterator<Item = Initializer>,
    post_corruption_handler: impl Fn(
        &str,
        &ModifiedJournalInfo,
        usize,
        SimpleDB,
        RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
    ),
    post_repair_handler: impl Fn(
        &str,
        &ModifiedJournalInfo,
        usize,
        RuntimeResult<RepairResult>,
        SimpleDB,
        RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
    ),
) {
    emulate_sequentially_varying_single_corruption(
        initializers,
        |original_journal, modified_journal, _, trim_amount, _offsets| {
            FileSystem::copy(original_journal, modified_journal)?;
            let mut f = File::open_rw(modified_journal)?;
            let real_flen = f.f_len()? as usize;
            f.f_truncate((real_flen - trim_amount) as _)?;
            Ok(ModifiedJournalStorageInfo::new(
                real_flen,
                trim_amount,
                trim_amount..real_flen,
            ))
        },
        post_corruption_handler,
        post_repair_handler,
    )
}

/// In this emulation, we sequentially corrupt an intermediary event across multiple trials
fn emulate_midway_corruption(
    initializers: impl IntoIterator<Item = Initializer>,
    post_corruption_handler: impl Fn(
        &str,
        &ModifiedJournalInfo,
        usize,
        SimpleDB,
        RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
    ),
    post_repair_handler: impl Fn(
        &str,
        &ModifiedJournalInfo,
        usize,
        RuntimeResult<RepairResult>,
        SimpleDB,
        RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
    ),
) {
    emulate_sequentially_varying_single_corruption(
        initializers,
        |original_journal_path,
         corrupted_journal_path,
         initializer_info,
         trim_size,
         original_offsets| {
            let orig_journal_data = FileSystem::read(original_journal_path)?;
            let orig_journal_size = orig_journal_data.len();
            let mut f = File::create(corrupted_journal_path)?;
            let end_offset = *original_offsets
                .get(&initializer_info.corrupted_event_id)
                .unwrap() as usize;
            // apply
            let segment_before_corruption = &orig_journal_data[..end_offset - trim_size];
            let segment_after_corruption = &orig_journal_data[end_offset..];
            let new_size = segment_before_corruption.len() + segment_after_corruption.len();
            assert!(
                new_size < orig_journal_size,
                "real len is {orig_journal_size} while new len is {new_size}",
            );
            assert_eq!(
                segment_before_corruption.len() + segment_after_corruption.len() + trim_size,
                orig_journal_size
            );
            f.fwrite_all(segment_before_corruption)?;
            f.fwrite_all(segment_after_corruption)?;
            let corruption_range = end_offset - trim_size..end_offset;
            assert_eq!(corruption_range.len(), trim_size);
            Ok(ModifiedJournalStorageInfo::new(
                orig_journal_size,
                new_size,
                corruption_range,
            ))
        },
        post_corruption_handler,
        post_repair_handler,
    )
}

/// Format a key as a string (padded to six bytes)
fn keyfmt(num: usize) -> String {
    format!("key-{num:06}")
}

/// Apply an event mix
/// - Add [`TRIALS`] count of elements
/// - Remove every 10th element
fn apply_event_mix(jrnl: &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<u64> {
    let mut op_count = 0;
    let mut sdb = SimpleDB::new();
    for num in 1..=TRIALS {
        op_count += 1;
        sdb.push(jrnl, keyfmt(num))?;
        if num % 10 == 0 {
            op_count += 1;
            sdb.pop(jrnl)?;
        }
    }
    assert_eq!(sdb.data().len(), POST_TRIALS_SIZE);
    Ok(op_count)
}

#[sky_macros::test]
fn corruption_before_close() {
    let initializers = [
        Initializer::new_driver_type(
            /*
                in this case we: create, close (0), corrupt close (0)
            */
            "close_event_corruption_empty.db",
            |jrnl_id| {
                let mut jrnl = journal_init(jrnl_id)?;
                RawJournalWriter::close_driver(&mut jrnl)?;
                Ok(InitializerInfo::new_last_event(0))
            },
        ),
        Initializer::new_driver_type(
            /*
                in this case we: create, apply events ([0,99]), close (100). corrupt close (100). expect no data loss.
            */
            "close_event_corruption.db",
            |jrnl_id| {
                let mut jrnl = journal_init(jrnl_id)?;
                let operation_count = apply_event_mix(&mut jrnl)?;
                RawJournalWriter::close_driver(&mut jrnl)?;
                Ok(InitializerInfo::new_last_event(operation_count))
            },
        ),
        Initializer::new_driver_type(
            /*
                in this case we: create, close (0), reopen(1), close(2). corrupt last close (2)
            */
            "close_event_corruption_open_close_open_close.db",
            |jrnl_id| {
                // open and close
                let mut jrnl = journal_init(jrnl_id)?;
                RawJournalWriter::close_driver(&mut jrnl)?;
                drop(jrnl);
                // reinit and close
                let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?;
                RawJournalWriter::close_driver(&mut jrnl)?;
                Ok(InitializerInfo::new_last_event(2))
            },
        ),
    ];
    emulate_final_event_corruption(
        initializers,
        |journal_id, modified_journal_info, trim_size, db, open_result| {
            // open the journal and validate failure
            let open_err = open_result.unwrap_err();
            let trace = debug_get_trace();
            if trim_size > (DriverEvent::FULL_EVENT_SIZE - (sizeof!(u128) + sizeof!(u64))) {
                // the amount of trim from the end of the file causes us to lose valuable metadata
                if modified_journal_info.init.last_executed_event_id == 0 {
                    // empty log
                    assert_eq!(
                        db.data().len(),
                        0,
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                    assert_eq!(
                        trace,
                        intovec![
                            JournalReaderTraceEvent::Initialized,
                            JournalReaderTraceEvent::LookingForEvent
                        ],
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    )
                } else {
                    if modified_journal_info.initializer_id == 1 {
                        // in the second case, we apply the event mix so we need to check this
                        assert_eq!(
                            db.data().len(),
                            POST_TRIALS_SIZE,
                            "failed at trim_size {trim_size} for journal {journal_id}"
                        );
                        assert_eq!(
                            *db.data().last().unwrap(),
                            keyfmt(TRIALS - 1),
                            "failed at trim_size {trim_size} for journal {journal_id}"
                        );
                    } else {
                        assert_eq!(
                            db.data().len(),
                            0,
                            "failed at trim_size {trim_size} for journal {journal_id}"
                        );
                    }
                    assert_eq!(
                        *trace.last().unwrap(),
                        JournalReaderTraceEvent::LookingForEvent.into(),
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                }
            } else {
                // the amount of trim still allows us to read some metadata
                if modified_journal_info.init.last_executed_event_id == 0 {
                    // empty log
                    assert_eq!(
                        db.data().len(),
                        0,
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                    assert_eq!(
                        trace,
                        intovec![
                            JournalReaderTraceEvent::Initialized,
                            JournalReaderTraceEvent::LookingForEvent,
                            JournalReaderTraceEvent::AttemptingEvent(
                                modified_journal_info.init.corrupted_event_id
                            ),
                            JournalReaderTraceEvent::DriverEventExpectingClose,
                        ],
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    )
                } else {
                    if modified_journal_info.initializer_id == 1 {
                        // in the second case, we apply the event mix so we need to check this
                        assert_eq!(
                            db.data().len(),
                            POST_TRIALS_SIZE,
                            "failed at trim_size {trim_size} for journal {journal_id}"
                        );
                        assert_eq!(
                            *db.data().last().unwrap(),
                            keyfmt(TRIALS - 1),
                            "failed at trim_size {trim_size} for journal {journal_id}"
                        );
                    } else {
                        assert_eq!(
                            db.data().len(),
                            0,
                            "failed at trim_size {trim_size} for journal {journal_id}"
                        );
                    }
                    assert_eq!(
                        &trace[trace.len() - 3..],
                        &into_array![
                            JournalReaderTraceEvent::LookingForEvent,
                            JournalReaderTraceEvent::AttemptingEvent(
                                modified_journal_info.init.corrupted_event_id
                            ),
                            JournalReaderTraceEvent::DriverEventExpectingClose
                        ],
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                }
            }
            assert_eq!(
                open_err.kind(),
                &ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()),
                "failed at trim_size {trim_size} for journal {journal_id}"
            );
        },
        |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| {
            assert_eq!(
                repair_result.expect(&format!(
                    "failed at trim_size {trim_size} for journal {journal_id}"
                )),
                RepairResult::UnspecifiedLoss((DriverEvent::FULL_EVENT_SIZE - trim_size) as _),
                "failed at trim_size {trim_size} for journal {journal_id}"
            );
            if modified_journal_info.init.last_executed_event_id == 0
                || modified_journal_info.initializer_id == 2
            {
                assert_eq!(
                    db.data().len(),
                    0,
                    "failed at trim_size {trim_size} for journal {journal_id}"
                );
            } else {
                // in the second case, we apply the event mix so we need to check this
                assert_eq!(
                    db.data().len(),
                    POST_TRIALS_SIZE,
                    "failed at trim_size {trim_size} for journal {journal_id}"
                );
                assert_eq!(
                    *db.data().last().unwrap(),
                    keyfmt(TRIALS - 1),
                    "failed at trim_size {trim_size} for journal {journal_id}"
                );
            }
            let _ = reopen_result.expect(&format!(
                "failed at trim_size {trim_size} for journal {journal_id}"
            ));
            // clear trace
            let _ = debug_get_trace();
            let _ = debug_get_offsets();
        },
    )
}

#[sky_macros::test]
fn corruption_after_reopen() {
    let initializers = [
        Initializer::new_driver_type(
            /*
                in this case we: create, close (0), reopen(1). corrupt reopen (1)
            */
            "corruption_after_reopen.db",
            |jrnl_id| {
                let mut jrnl = journal_init(jrnl_id)?;
                RawJournalWriter::close_driver(&mut jrnl)?;
                drop(jrnl);
                // reopen, but don't close
                journal_open(jrnl_id, &SimpleDB::new())?;
                Ok(InitializerInfo::new_last_event(1))
            },
        ),
        Initializer::new_driver_type(
            /*
                in this case we: create, apply events([0,99]), close (100), reopen(101). corrupt reopen (101). expect no data loss.
            */
            "corruption_after_ropen_multi_before_close.db",
            |jrnl_id| {
                let mut jrnl = journal_init(jrnl_id)?;
                let operation_count = apply_event_mix(&mut jrnl)?;
                RawJournalWriter::close_driver(&mut jrnl)?;
                drop(jrnl);
                // reopen, but don't close
                journal_open(jrnl_id, &SimpleDB::new())?;
                Ok(InitializerInfo::new_last_event(operation_count + 1)) // + 1 since we have the reopen event which is the next event that'll vanish
            },
        ),
    ];
    emulate_final_event_corruption(
        initializers,
        |journal_id, modified_journal_info, trim_size, db, open_result| {
            let trace = debug_get_trace();
            if trim_size == DriverEvent::FULL_EVENT_SIZE {
                /*
                    IMPORTANT IFFY SITUATION: undetectable error. if an entire "correct" part of the log vanishes, it's not going to be detected.
                    while possible in theory, it's going to have to be one heck of a coincidence for it to happen in practice. the only way to work
                    around this is to use a secondary checksum. I'm not a fan of that approach either (and I don't even consider it to be a good mitigation)
                    because it can potentially violate consistency, conflicting the source of truth. for example: if we have a database crash, should we trust
                    the checksum file or the log? guarding that further requires an enormous amount of effort and it will still have holes and ironically,
                    will potentially introduce more bugs due to increased complexity. Get a good filesystem and disk controller (that attaches checksums to sectors)!
                    -- @ohsayan
                */
                let mut jrnl = open_result.expect(&format!(
                    "failed at trim_size {trim_size} for journal {journal_id}"
                ));
                if modified_journal_info.init.last_executed_event_id == 1 {
                    // empty log, only the reopen
                    assert_eq!(
                        db.data().len(),
                        0,
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                    assert_eq!(
                        trace,
                        intovec![
                            JournalReaderTraceEvent::Initialized,
                            JournalReaderTraceEvent::LookingForEvent,
                            JournalReaderTraceEvent::AttemptingEvent(0),
                            JournalReaderTraceEvent::DriverEventExpectingClose,
                            JournalReaderTraceEvent::DriverEventCompletedBlockRead,
                            JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
                            JournalReaderTraceEvent::ClosedAndReachedEof,
                            JournalReaderTraceEvent::Completed,
                            JournalWriterTraceEvent::ReinitializeAttempt,
                            JournalWriterTraceEvent::DriverEventAttemptCommit {
                                event: DriverEventKind::Reopened,
                                event_id: modified_journal_info.init.corrupted_event_id,
                                prev_id: 0
                            },
                            JournalWriterTraceEvent::DriverEventCompleted,
                            JournalWriterTraceEvent::ReinitializeComplete,
                        ],
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                } else {
                    // we will have upto the last event since only the reopen is gone
                    assert_eq!(
                        db.data().len(),
                        POST_TRIALS_SIZE,
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                    assert_eq!(
                        *db.data().last().unwrap(),
                        keyfmt(TRIALS - 1),
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                    assert_eq!(
                        &trace[trace.len() - 12..],
                        intovec![
                            JournalReaderTraceEvent::ServerEventAppliedSuccess,
                            JournalReaderTraceEvent::LookingForEvent,
                            JournalReaderTraceEvent::AttemptingEvent(
                                modified_journal_info.init.corrupted_event_id - 1
                            ), // close event
                            JournalReaderTraceEvent::DriverEventExpectingClose,
                            JournalReaderTraceEvent::DriverEventCompletedBlockRead,
                            JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
                            JournalReaderTraceEvent::ClosedAndReachedEof,
                            JournalReaderTraceEvent::Completed,
                            JournalWriterTraceEvent::ReinitializeAttempt,
                            JournalWriterTraceEvent::DriverEventAttemptCommit {
                                event: DriverEventKind::Reopened,
                                event_id: modified_journal_info.init.corrupted_event_id,
                                prev_id: modified_journal_info.init.corrupted_event_id - 1 // close event
                            },
                            JournalWriterTraceEvent::DriverEventCompleted,
                            JournalWriterTraceEvent::ReinitializeComplete
                        ],
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    )
                }
                // now close this so that this works with the post repair handler
                RawJournalWriter::close_driver(&mut jrnl).unwrap();
                let _ = debug_get_offsets();
                let _ = debug_get_trace();
            } else {
                assert_eq!(
                    open_result.unwrap_err().kind(),
                    &ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()),
                    "failed at trim_size {trim_size} for journal {journal_id}"
                );
                if modified_journal_info.init.last_executed_event_id == 1 {
                    // empty log, only the reopen
                    assert_eq!(
                        db.data().len(),
                        0,
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                    assert_eq!(
                        trace,
                        intovec![
                            JournalReaderTraceEvent::Initialized,
                            JournalReaderTraceEvent::LookingForEvent,
                            JournalReaderTraceEvent::AttemptingEvent(0),
                            JournalReaderTraceEvent::DriverEventExpectingClose,
                            JournalReaderTraceEvent::DriverEventCompletedBlockRead,
                            JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
                            JournalReaderTraceEvent::DriverEventExpectingReopenBlock,
                            JournalReaderTraceEvent::AttemptingEvent(
                                modified_journal_info.init.corrupted_event_id
                            )
                        ],
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                } else {
                    // we will have upto the last event since only the reopen is gone
                    assert_eq!(db.data().len(), POST_TRIALS_SIZE);
                    assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1));
                    assert_eq!(
                        &trace[trace.len() - 5..],
                        intovec![
                            JournalReaderTraceEvent::DriverEventExpectingClose,
                            JournalReaderTraceEvent::DriverEventCompletedBlockRead,
                            JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
                            JournalReaderTraceEvent::DriverEventExpectingReopenBlock,
                            JournalReaderTraceEvent::AttemptingEvent(
                                modified_journal_info.init.corrupted_event_id
                            )
                        ],
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                }
            }
        },
        |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| {
            assert!(reopen_result.is_ok());
            if trim_size == DriverEvent::FULL_EVENT_SIZE {
                // see earlier comment
                assert_eq!(
                    repair_result.expect(&format!(
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    )),
                    RepairResult::NoErrors,
                    "failed at trim_size {trim_size} for journal {journal_id}"
                );
            } else {
                assert_eq!(
                    repair_result.expect(&format!(
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    )),
                    RepairResult::UnspecifiedLoss(
                        (DriverEvent::FULL_EVENT_SIZE - trim_size) as u64
                    ),
                    "failed at trim_size {trim_size} for journal {journal_id}"
                );
            }
            if modified_journal_info.init.last_executed_event_id == 1 {
                assert_eq!(
                    db.data().len(),
                    0,
                    "failed at trim_size {trim_size} for journal {journal_id}"
                );
            } else {
                assert_eq!(db.data().len(), POST_TRIALS_SIZE);
                assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1));
            }
            let _ = debug_get_trace();
            let _ = debug_get_offsets();
        },
    )
}

#[sky_macros::test]
fn corruption_at_runtime() {
    // first get the offsets to compute the size of the event
    let offset = {
        debug_set_offset_tracking(true);
        let mut sdb = SimpleDB::new();
        let mut jrnl = create_journal("corruption_at_runtime_test_log.db").unwrap();
        sdb.push(&mut jrnl, KEY).unwrap();
        let (_, offset) = debug_get_offsets().pop_last().unwrap();
        let ret = offset as usize - SIMPLEDB_JOURNAL_HEADER_SIZE;
        debug_set_offset_tracking(false);
        let _ = debug_get_trace();
        ret
    };
    let initializers = [
        Initializer::new(
            /*
                for this one we:
                - PRC1: we create and apply one event (0)

                exepct data loss (0).
            */
            "corruption_at_runtime_open_commit_corrupt",
            |jrnl_id| {
                let mut sdb = SimpleDB::new();
                let mut jrnl = create_journal(jrnl_id)?;
                sdb.push(&mut jrnl, KEY)?;
                // don't close
                Ok(InitializerInfo::new_last_event(0))
            },
            offset,
        ),
        Initializer::new(
            /*
                for this one we:
                - PRC1: we create and apply events ([0,99])
                expect data loss (99)
            */
            "corruption_at_runtime_open_multi_commit_then_corrupt",
            |jrnl_id| {
                let mut op_count = 0;
                let mut sdb = SimpleDB::new();
                let mut jrnl = create_journal(jrnl_id)?;
                for _ in 1..=TRIALS {
                    sdb.push(&mut jrnl, KEY)?;
                    op_count += 1;
                }
                // don't close
                Ok(InitializerInfo::new_last_event(op_count))
            },
            offset,
        ),
    ];
    emulate_final_event_corruption(
        initializers,
        |journal_id, modified_journal_info, trim_size, db, open_result| {
            let trace = debug_get_trace();
            let err = open_result.unwrap_err();
            assert_eq!(
                err.kind(),
                &ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()),
                "failed for journal {journal_id} with trim_size {trim_size}"
            );
            if trim_size > offset - (sizeof!(u128) + sizeof!(u64)) {
                if modified_journal_info.init.last_executed_event_id == 0 {
                    assert_eq!(
                        db.data().len(),
                        0,
                        "failed for journal {journal_id} with trim_size {trim_size}"
                    );
                    assert_eq!(
                        trace,
                        intovec![
                            JournalReaderTraceEvent::Initialized,
                            JournalReaderTraceEvent::LookingForEvent,
                        ],
                        "failed for journal {journal_id} with trim_size {trim_size}"
                    )
                } else {
                    // we lost the last server event, so we'll have one key less
                    assert_eq!(
                        db.data().len(),
                        TRIALS - 1,
                        "failed for journal {journal_id} with trim_size {trim_size}"
                    );
                    assert_eq!(
                        db.data()[TRIALS - 2],
                        KEY,
                        "failed for journal {journal_id} with trim_size {trim_size}"
                    );
                    assert_eq!(
                        &trace[trace.len() - 4..],
                        intovec![
                            JournalReaderTraceEvent::DetectedServerEvent,
                            JournalReaderTraceEvent::ServerEventMetadataParsed,
                            JournalReaderTraceEvent::ServerEventAppliedSuccess,
                            JournalReaderTraceEvent::LookingForEvent,
                        ],
                        "failed for journal {journal_id} with trim_size {trim_size}"
                    )
                }
            } else {
                if modified_journal_info.init.last_executed_event_id == 0 {
                    // empty log
                    assert_eq!(
                        db.data().len(),
                        0,
                        "failed for journal {journal_id} with trim_size {trim_size}"
                    );
                    assert_eq!(
                        trace,
                        intovec![
                            JournalReaderTraceEvent::Initialized,
                            JournalReaderTraceEvent::LookingForEvent,
                            JournalReaderTraceEvent::AttemptingEvent(0),
                            JournalReaderTraceEvent::DetectedServerEvent,
                            JournalReaderTraceEvent::ServerEventMetadataParsed,
                        ],
                        "failed for journal {journal_id} with trim_size {trim_size}"
                    );
                } else {
                    // we lost the last server event, so we'll have one key less
                    assert_eq!(
                        db.data().len(),
                        TRIALS - 1,
                        "failed for journal {journal_id} with trim_size {trim_size}"
                    );
                    assert_eq!(
                        db.data()[TRIALS - 2],
                        KEY,
                        "failed for journal {journal_id} with trim_size {trim_size}"
                    );
                    assert_eq!(
                        &trace[trace.len() - 4..],
                        intovec![
                            JournalReaderTraceEvent::LookingForEvent,
                            JournalReaderTraceEvent::AttemptingEvent(
                                modified_journal_info.init.corrupted_event_id - 1
                            ),
                            JournalReaderTraceEvent::DetectedServerEvent,
                            JournalReaderTraceEvent::ServerEventMetadataParsed,
                        ],
                        "failed for journal {journal_id} with trim_size {trim_size}"
                    );
                }
            }
        },
        |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| {
            assert!(reopen_result.is_ok());
            assert_eq!(
                repair_result.expect(&format!(
                    "failed at trim_size {trim_size} for journal {journal_id}"
                )),
                RepairResult::UnspecifiedLoss((offset - trim_size) as u64),
                "failed for journal {journal_id} with trim_size {trim_size}"
            );
            if modified_journal_info.init.last_executed_event_id == 0 {
                assert_eq!(
                    db.data().len(),
                    0,
                    "failed for journal {journal_id} with trim_size {trim_size}"
                );
            } else {
                assert_eq!(
                    db.data().len(),
                    TRIALS - 1,
                    "failed for journal {journal_id} with trim_size {trim_size}"
                );
                assert_eq!(
                    db.data()[TRIALS - 2],
                    KEY,
                    "failed for journal {journal_id} with trim_size {trim_size}"
                );
            }
            let _ = debug_get_trace();
        },
    )
}

/*
    midway corruption tests
    ---
    while in the prior tests we tested cases where the last event was corrupted, we now test cases where some middle
    portion of the journal gets corrupted. the trouble is that we'll have to enumerate all cases for generated traces...
    which is absolutely not feasible. Instead, we just ensure that the pre and post states are valid.
*/

#[sky_macros::test]
fn midway_corruption_close() {
    let initializers = [
        Initializer::new_driver_type("midway_corruption_close_direct", |jrnl_id| {
            /*
                in this test corruption case we:
                - PR cycle 1: create and close (0)
                - PR cycle 2: open (1) and close (2)
                we emulate a sequential corruption case for (0)
            */
            // create and close
            let mut jrnl = journal_init(jrnl_id)?;
            RawJournalWriter::close_driver(&mut jrnl)?;
            drop(jrnl);
            // reopen and close
            let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?;
            RawJournalWriter::close_driver(&mut jrnl)?;
            drop(jrnl);
            Ok(InitializerInfo::new(0, 2)) // close (to corrupt), reopen, close
        }),
        Initializer::new_driver_type(
            /*
                in this test case we:
                - PR cycle 1: create and close (0)
                - PR cycle 2: reopen (1), apply events([2,101]), close (102)
                - PR cycle 3: reopen (103), close (104)
                we emulate a sequential corruption case for (102). expect all events to persist (<= 101)
            */
            "midway_corruption_close_events_before_second_close",
            |jrnl_id| {
                {
                    // create and close
                    let mut jrnl = journal_init(jrnl_id)?;
                    RawJournalWriter::close_driver(&mut jrnl)?; // (0)
                }
                let op_cnt;
                {
                    // reopen, apply mix and close
                    let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1)
                    op_cnt = apply_event_mix(&mut jrnl)?;
                    RawJournalWriter::close_driver(&mut jrnl)?; // <-- (op_cnt + 2) corrupt this one
                }
                {
                    // reopen and close
                    let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (op_cnt + 3)
                    RawJournalWriter::close_driver(&mut jrnl)?; // (op_cnt + 4)
                }
                Ok(InitializerInfo::new(op_cnt + 2, op_cnt + 4))
            },
        ),
        Initializer::new_driver_type(
            /*
                in this test case:
                - PR cycle 1: create and close (0)
                - PR cycle 2: reopen (1) and close (2)
                - PR cycle 3: reopen(3), apply events([4,103]), close(104)
                we emulate a sequential corruption of (2) which results in a catastrophic corruption. expect major
                data loss (==TRIALS)
            */
            "midway_corruption_close_events_before_third_close",
            |jrnl_id| {
                {
                    // create and close
                    let mut jrnl = journal_init(jrnl_id)?;
                    RawJournalWriter::close_driver(&mut jrnl)?; // (0)
                }
                {
                    // reopen and close
                    let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1)
                    RawJournalWriter::close_driver(&mut jrnl)?; // <-- (2) corrupt this one
                }
                let op_cnt;
                {
                    let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (3)
                    op_cnt = apply_event_mix(&mut jrnl)?; // (3 + op_count)
                    RawJournalWriter::close_driver(&mut jrnl)?; // (4 + op_count)
                }
                Ok(InitializerInfo::new(2, op_cnt + 4)) // corrupt the second close event
            },
        ),
    ];
    debug_set_offset_tracking(true);
    emulate_midway_corruption(
        initializers,
        |journal_id, modified_journal_info, trim_size, db, open_result| {
            assert!(
                open_result.is_err(),
                "failed for journal {journal_id} with trim_size {trim_size}"
            );
            match modified_journal_info.initializer_id {
                0 | 2 => {
                    // in the first and third case, (0) no data is present (2) all data is lost
                    // all data will be lost, so the DB will be empty
                    assert_eq!(
                        db.data().len(),
                        0,
                        "failed for journal {journal_id} with trim_size {trim_size}"
                    );
                }
                1 => {
                    // in this case, all elements will be preserved
                    assert_eq!(
                        *db.data().last().unwrap(),
                        keyfmt(TRIALS - 1),
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                }
                _ => panic!(),
            }
            let _ = debug_get_offsets();
            let _ = debug_get_trace();
        },
        |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| {
            let _ = reopen_result.expect(&format!(
                "failed at trim_size {trim_size} for journal {journal_id}"
            ));
            match modified_journal_info.initializer_id {
                0 | 2 => {
                    // all data will be lost, so the DB will be empty
                    assert_eq!(
                        db.data().len(),
                        0,
                        "failed for journal {journal_id} with trim_size {trim_size}"
                    );
                    if modified_journal_info.init.corrupted_event_id == 0
                        && modified_journal_info.init.not_last_event()
                    {
                        // the first event was corrupted
                        assert_eq!(
                            repair_result.expect(&format!(
                                "failed at trim_size {trim_size} for journal {journal_id}"
                            )),
                            RepairResult::UnspecifiedLoss(
                                ((DriverEvent::FULL_EVENT_SIZE * 3) - trim_size) as u64
                            ),
                            "failed for journal {journal_id} with trim_size {trim_size}"
                        );
                    } else {
                        // this is a serious midway corruption with major data loss
                        let full_log_size = File::open_rw(journal_id).unwrap().f_len().unwrap();
                        assert_eq!(
                            repair_result.expect(&format!(
                                "failed at trim_size {trim_size} for journal {journal_id}"
                            )),
                            RepairResult::UnspecifiedLoss(
                                full_log_size
                                    - SIMPLEDB_JOURNAL_HEADER_SIZE // account for header
                                        as u64
                                    - (DriverEvent::FULL_EVENT_SIZE * 2) as u64 // account for close (0), reopen(1)
                                    - trim_size as u64 // account for trim
                            ),
                            "failed for journal {journal_id} with trim_size {trim_size}"
                        );
                    }
                }
                1 => {
                    // in this case, all elements will be preserved
                    assert_eq!(
                        *db.data().last().unwrap(),
                        keyfmt(TRIALS - 1),
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                }
                _ => panic!(),
            }
            let _ = debug_get_trace();
            let _ = debug_get_offsets();
        },
    );
    debug_set_offset_tracking(false);
}

#[sky_macros::test]
fn midway_corruption_reopen() {
    let initializers = [
        Initializer::new(
            "midway_corruption_reopen_close_reopen_close",
            |jrnl_id| {
                /*
                    for this test case we create and close (0) the journal and in the next power cycle we reopen (1) and close (2) the
                    journal. we emulate a midway corruption where the reopen (1) gets corrupted.
                */
                {
                    let mut jrnl = journal_init(jrnl_id)?;
                    RawJournalWriter::close_driver(&mut jrnl)?; // (0)
                }
                {
                    let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1) <-- corrupt
                    RawJournalWriter::close_driver(&mut jrnl)?; // (2)
                }
                Ok(InitializerInfo::new(1, 2))
            },
            DriverEvent::FULL_EVENT_SIZE,
        ),
        Initializer::new(
            /*
                create, apply ([0,99]), close (100). reopen(101), close (102). corrupt (101). expect no data loss
            */
            "midway_corruption_reopen_apply_close_reopen_close",
            |jrnl_id| {
                let op_count;
                {
                    let mut jrnl = journal_init(jrnl_id)?;
                    op_count = apply_event_mix(&mut jrnl)?;
                    RawJournalWriter::close_driver(&mut jrnl)?;
                }
                {
                    let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?;
                    RawJournalWriter::close_driver(&mut jrnl)?;
                }
                Ok(InitializerInfo::new((op_count + 1) as u64, 102))
            },
            DriverEvent::FULL_EVENT_SIZE,
        ),
        Initializer::new(
            /*
                create, close (0). reopen(1), apply ([2,101]), close (102). corrupt (1). expect full data loss
            */
            "midway_corruption_reopen_apply_post_corrupted_reopen",
            |jrnl_id| {
                {
                    let mut jrnl = journal_init(jrnl_id)?;
                    RawJournalWriter::close_driver(&mut jrnl)?;
                }
                {
                    let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // <-- corrupt this one
                    let _ = apply_event_mix(&mut jrnl)?; // apply mix
                    RawJournalWriter::close_driver(&mut jrnl)?;
                }
                Ok(InitializerInfo::new(1, 102))
            },
            DriverEvent::FULL_EVENT_SIZE,
        ),
    ];
    debug_set_offset_tracking(true); // we need to track offsets
    emulate_midway_corruption(
        initializers,
        |journal_id, modified_journal_info, trim_size, db, open_result| {
            let _ = open_result.unwrap_err();
            match modified_journal_info.initializer_id {
                0 | 2 => {
                    assert_eq!(
                        db.data().len(),
                        0,
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                }
                1 => {
                    assert_eq!(
                        db.data().len(),
                        POST_TRIALS_SIZE,
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                    assert_eq!(
                        *db.data().last().unwrap(),
                        keyfmt(TRIALS - 1),
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    )
                }
                _ => panic!(),
            }
            let _ = debug_get_trace();
            let _ = debug_get_offsets();
        },
        |journal_id, modified_journal_info, trim_size, repair_result, db, open_result| {
            let _ = open_result.expect(&format!(
                "failed at trim_size {trim_size} for journal {journal_id}"
            ));
            let repair_result = repair_result.expect(&format!(
                "failed at trim_size {trim_size} for journal {journal_id}"
            ));
            assert_eq!(
                repair_result,
                RepairResult::UnspecifiedLoss(
                    ((modified_journal_info.storage.modified_file_size
                        - modified_journal_info.storage.corruption_range.start)
                        + (DriverEvent::FULL_EVENT_SIZE - trim_size)) as u64
                ),
                "failed at trim_size {trim_size} for journal {journal_id}"
            );
            match modified_journal_info.initializer_id {
                0 | 2 => {
                    assert_eq!(
                        db.data().len(),
                        0,
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                }
                1 => {
                    assert_eq!(
                        db.data().len(),
                        POST_TRIALS_SIZE,
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                    assert_eq!(
                        *db.data().last().unwrap(),
                        keyfmt(TRIALS - 1),
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    )
                }
                _ => panic!(),
            }
            let _ = debug_get_trace();
            let _ = debug_get_offsets();
        },
    );
    debug_set_offset_tracking(false);
}

#[sky_macros::test]
fn midway_corruption_at_runtime() {
    debug_set_offset_tracking(true);
    // compute offset size
    let event_size_fixed_size_key = {
        let mut jrnl =
            create_journal::<SimpleDBJournal>("midway_corruption_at_runtime_fixed_key").unwrap();
        SimpleDB::new().push(&mut jrnl, KEY).unwrap();
        let (_, offsets) = (debug_get_trace(), debug_get_offsets());
        *offsets.get(&0).unwrap() as usize - SIMPLEDB_JOURNAL_HEADER_SIZE
    };
    // compute offset size
    let event_size_dynamic_key = {
        let mut jrnl =
            create_journal::<SimpleDBJournal>("midway_corruption_at_runtime_dynamic_key").unwrap();
        SimpleDB::new().push(&mut jrnl, keyfmt(0)).unwrap();
        let (_, offsets) = (debug_get_trace(), debug_get_offsets());
        *offsets.get(&0).unwrap() as usize - SIMPLEDB_JOURNAL_HEADER_SIZE
    };
    let initializers = [
        Initializer::new(
            /*
                open, apply (0), close (1). corrupt (0). expect complete loss of the push event (0).
            */
            "midway_corruption_at_runtime_open_server_event_close",
            |jrnl_id| {
                let mut jrnl = journal_init(jrnl_id)?;
                SimpleDB::new().push(&mut jrnl, KEY)?;
                RawJournalWriter::close_driver(&mut jrnl)?;
                Ok(InitializerInfo::new(0, 1))
            },
            event_size_fixed_size_key,
        ),
        Initializer::new(
            /*
                open, apply([0,99]), close (100). corrupt (99). expect complete loss of the last event(99).
            */
            "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_last",
            |jrnl_id| {
                let mut jrnl = journal_init(jrnl_id)?;
                let mut sdb = SimpleDB::new();
                for num in 1..=TRIALS {
                    sdb.push(&mut jrnl, keyfmt(num))?;
                }
                RawJournalWriter::close_driver(&mut jrnl)?;
                Ok(InitializerInfo::new(TRIALS as u64 - 1, TRIALS as u64))
            },
            event_size_dynamic_key,
        ),
        Initializer::new(
            /*
                open, apply([0,99]), close (100). corrupt (0). expect complete loss of all events
            */
            "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_first",
            |jrnl_id| {
                let mut jrnl = journal_init(jrnl_id)?;
                let mut sdb = SimpleDB::new();
                for num in 1..=TRIALS {
                    sdb.push(&mut jrnl, keyfmt(num))?;
                }
                RawJournalWriter::close_driver(&mut jrnl)?;
                Ok(InitializerInfo::new(0, TRIALS as u64))
            },
            event_size_dynamic_key,
        ),
    ];
    emulate_midway_corruption(
        initializers,
        |journal_id, modified_journal_info, trim_size, db, open_result| {
            let _ = open_result.unwrap_err();
            let (_, _) = (debug_get_trace(), debug_get_offsets());
            match modified_journal_info.initializer_id {
                0 | 2 => {
                    assert_eq!(
                        db.data().len(),
                        0,
                        "failed at trim_size {trim_size} for journal {journal_id}. data={:?}",
                        db.data()
                    );
                }
                1 => {
                    // expect to have all keys upto TRIALS - 1
                    assert_eq!(
                        db.data().len(),
                        TRIALS - 1,
                        "failed at trim_size {trim_size} for journal {journal_id}. data={:?}",
                        db.data()
                    );
                    // last key is TRIALS - 1
                    assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1));
                }
                _ => panic!(),
            }
        },
        |journal_id, modified_journal_info, trim_size, repair_result, db, open_result| {
            let repair_result = repair_result.expect(&format!(
                "failed at trim_size {trim_size} for journal {journal_id}. file data={:?}. original_data={:?}",
                FileSystem::read(&make_corrupted_file_name(journal_id, trim_size)),
                FileSystem::read(journal_id),
            ));
            let _ = open_result.expect(&format!(
                "failed at trim_size {trim_size} for journal {journal_id}. file data={:?}. original_data={:?}",
                FileSystem::read(&make_corrupted_file_name(journal_id, trim_size)),
                FileSystem::read(journal_id),
            ));
            match modified_journal_info.initializer_id {
                0 | 2 => {
                    assert_eq!(
                        db.data().len(),
                        0,
                        "failed at trim_size {trim_size} for journal {journal_id}. data={:?}",
                        db.data()
                    );
                }
                1 => {
                    // expect to have all keys upto TRIALS - 1
                    assert_eq!(
                        db.data().len(),
                        TRIALS - 1,
                        "failed at trim_size {trim_size} for journal {journal_id}. data={:?}",
                        db.data()
                    );
                    // last key is TRIALS - 1
                    assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1));
                }
                _ => panic!(),
            }
            match modified_journal_info.initializer_id {
                0 => {
                    assert_eq!(
                        repair_result,
                        RepairResult::UnspecifiedLoss(
                            ((modified_journal_info.storage.modified_file_size
                                - modified_journal_info.storage.corruption_range.start)
                                + (event_size_fixed_size_key - trim_size))
                                as u64
                        ),
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    )
                }
                1 | 2 => {
                    assert_eq!(
                        repair_result,
                        RepairResult::UnspecifiedLoss(
                            ((modified_journal_info.storage.modified_file_size
                                - modified_journal_info.storage.corruption_range.start)
                                + (event_size_dynamic_key - trim_size))
                                as u64
                        ),
                        "failed at trim_size {trim_size} for journal {journal_id}"
                    );
                }
                _ => panic!(),
            }
            let (_, _) = (debug_get_trace(), debug_get_offsets());
        },
    )
}

/*
    rollback tests
*/

/// Steps:
/// 1. A new log is created
/// 2. Events and corruptions are introduced
/// 3. Rolled back
/// 4. Closed
/// 5. Re-opened
fn emulate_failure_for_rollback(
    journal_id: &str,
    action: impl Fn(&mut SimpleDB, &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<()>,
    verify_error: impl Fn(fractal::error::Error),
    post_rollback: impl Fn(&SimpleDB),
) {
    {
        let mut db = SimpleDB::new();
        let mut jrnl = create_journal::<SimpleDBJournal>(journal_id).unwrap();
        let err = action(&mut db, &mut jrnl).unwrap_err();
        verify_error(err);
        for _ in 0..1000 {
            // idempotency guarantee: no matter how many times this is called, the underlying state will rollback to, and only to the last event
            jrnl.__rollback().unwrap();
        }
        RawJournalWriter::close_driver(&mut jrnl).unwrap();
    }
    {
        let db = SimpleDB::new();
        let mut jrnl = journal_open(journal_id, &db).expect(&format!("{:#?}", debug_get_trace()));
        post_rollback(&db);
        RawJournalWriter::close_driver(&mut jrnl).unwrap();
    }
    FileSystem::remove_file(journal_id).unwrap();
}

#[sky_macros::test]
fn rollback_write_zero_empty_log() {
    emulate_failure_for_rollback(
        "rollback_empty_log_write_zero",
        |db, jrnl| {
            vfs_utils::debug_enable_zero_write_crash();
            let r = db.push(jrnl, "hello, world");
            vfs_utils::debug_disable_write_crash();
            r
        },
        |e| match e.kind() {
            ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {}
            unexpected => panic!("expected write zero, got {unexpected:?}"),
        },
        |db| assert_eq!(db.data().len(), 0),
    );
}

#[sky_macros::test]
fn rollback_write_zero_nonempty_log() {
    emulate_failure_for_rollback(
        "rollback_write_zero_nonempty_log",
        |db, jrnl| {
            // commit a single "good" event
            db.push(jrnl, "my good key")?;
            vfs_utils::debug_enable_zero_write_crash();
            let r = db.push(jrnl, "this won't go in");
            vfs_utils::debug_disable_write_crash();
            r
        },
        |e| match e.kind() {
            ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {}
            unexpected => panic!("expected write zero, got {unexpected:?}"),
        },
        |db| {
            assert_eq!(db.data().len(), 1);
            assert_eq!(db.data()[0], "my good key")
        },
    )
}

#[sky_macros::test]
fn rollback_random_write_failure_empty_log() {
    for _ in 0..100 {
        emulate_failure_for_rollback(
            "rollback_random_write_failure_empty_log",
            |db, jrnl| {
                vfs_utils::debug_enable_random_write_crash();
                let r = db.push(jrnl, "hello, world");
                vfs_utils::debug_disable_write_crash();
                r
            },
            |e| match e.kind() {
                ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {}
                unexpected => panic!("expected write zero, got {unexpected:?}"),
            },
            |db| assert_eq!(db.data().len(), 0),
        );
    }
}

#[sky_macros::test]
fn rollback_random_write_failure_log() {
    for _ in 0..100 {
        emulate_failure_for_rollback(
            "rollback_random_write_failure_log",
            |db, jrnl| {
                // commit a single "good" event
                db.push(jrnl, "my good key")?;
                vfs_utils::debug_enable_random_write_crash();
                let r = db.push(jrnl, "this won't go in");
                vfs_utils::debug_disable_write_crash();
                r
            },
            |e| match e.kind() {
                ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {}
                unexpected => panic!("expected write zero, got {unexpected:?}"),
            },
            |db| {
                assert_eq!(db.data().len(), 1);
                assert_eq!(db.data()[0], "my good key")
            },
        )
    }
}
